{
  "nbformat": 4,
  "nbformat_minor": 0,
  "metadata": {
    "colab": {
      "name": "28 - Push notifications with workflows",
      "provenance": [],
      "collapsed_sections": []
    },
    "kernelspec": {
      "name": "python3",
      "display_name": "Python 3"
    },
    "accelerator": "GPU"
  },
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "LjmhJ4ad9kBL"
      },
      "source": [
        "# Push notifications with workflows\n",
        "\n",
        "Workflows are a simple yet powerful construct that takes a callable and returns elements. They are streaming and work on data in batches, allowing large volumes of data to be processed efficiently.\n",
        "\n",
        "This notebook will show how workflows can be used to push notifications upon certain event triggers. Using this method, an activity feed of content can be created."
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "8tLWvo9v-Q0u"
      },
      "source": [
        "# Install dependencies\n",
        "\n",
        "Install `txtai` and all dependencies."
      ]
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "Fa5BCjMFqVKE"
      },
      "source": [
        "%%capture\n",
        "!pip install datasets git+https://github.com/neuml/txtai#egg=txtai[pipeline,workflow]"
      ],
      "execution_count": 1,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Create workflow notification action\n",
        "\n",
        "Workflows run a series of tasks to transform and process data. This section creates a callable object that can be used as a workflow action. \n",
        "\n",
        "The action below pushes events to [Slack](https://slack.com). While Slack is used here, any notification service can easily be substituted in ([Zapier](https://zapier.com/), [IFTT](https://ifttt.com/) etc).\n",
        "\n",
        "It is assumed there is a Slack workspace and application installed and ready to use. [See this comprehensive example](https://api.slack.com/tutorials/tracks/posting-messages-with-curl) for more information on setting up a new Slack app and posting messages via the API.\n",
        "\n",
        "The channel id can be found from the Slack web interface. Log into Slack and click on the channel where messages will be posted. The `channel id` is the last part of the URL.\n",
        "\n",
        "```\n",
        "https://app.slack.com/client/<team id>/<channel id>\n",
        "```"
      ],
      "metadata": {
        "id": "EJ_hHmQtRgQM"
      }
    },
    {
      "cell_type": "code",
      "metadata": {
        "id": "3hYRk9JnsM0J"
      },
      "source": [
        "import logging\n",
        "import requests\n",
        "\n",
        "# Uncomment and set. The following are dummy parameters. Your parameters should not be publicly shared!\n",
        "# AUTH = \"xoxb-not-a-real-token-this-will-not-work\"\n",
        "# CHANNEL = \"C0XXXXXXXXX\"\n",
        "# URL = \"https://slack.com/api/chat.postMessage\"\n",
        "\n",
        "# Logging configuration\n",
        "logger = logging.getLogger(__name__)\n",
        "logger.setLevel(logging.INFO)\n",
        "\n",
        "class Slack:\n",
        "    def __init__(self):\n",
        "        self.alerts = set()\n",
        "\n",
        "    def __call__(self, elements):\n",
        "        for alert in elements:\n",
        "            uid, text, _ = self.extract(alert)\n",
        "            if uid not in self.alerts:\n",
        "                logger.info(\"Sending alert: %s\", alert)\n",
        "                self.alerts.add(uid)\n",
        "\n",
        "                headers = {\n",
        "                    \"Content-type\": \"application/json\",\n",
        "                    \"Authorization\": f\"Bearer {AUTH}\"\n",
        "                }\n",
        "\n",
        "                response = requests.post(URL, headers=headers, json={\n",
        "                    \"channel\": CHANNEL,\n",
        "                    \"text\": f\"{text} {uid}\"\n",
        "                }).json()\n",
        "\n",
        "                if not response[\"ok\"]:\n",
        "                    logger.error(response)\n",
        "\n",
        "        return elements\n",
        "\n",
        "    def extract(self, alert):\n",
        "        if isinstance(alert, dict):\n",
        "            return (alert[\"id\"], alert[\"text\"], None)\n",
        "\n",
        "        return alert\n"
      ],
      "execution_count": 6,
      "outputs": []
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Build a semantic notification workflow\n",
        "\n",
        "Next we'll create a notification workflow. The example below indexes the top trending Hacker News articles and pushes an alert when an article matches an embeddings query for `software development library`."
      ],
      "metadata": {
        "id": "_B4YFu-1R2QC"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from txtai.app import Application\n",
        "\n",
        "workflow = \"\"\"\n",
        "writable: true\n",
        "\n",
        "embeddings:\n",
        "  path: sentence-transformers/nli-mpnet-base-v2\n",
        "  content: true\n",
        "\n",
        "tabular:\n",
        "  idcolumn: url\n",
        "  textcolumns:\n",
        "    - title\n",
        "\n",
        "__main__.Slack:\n",
        "\n",
        "workflow:\n",
        "  index:\n",
        "    schedule:\n",
        "      cron: \"* * * * * 0/5\"\n",
        "      elements:\n",
        "        - front_page\n",
        "      iterations: 1\n",
        "    tasks:\n",
        "      - batch: false\n",
        "        extract:\n",
        "          - hits\n",
        "        method: get\n",
        "        params:\n",
        "          tags: null\n",
        "        task: service\n",
        "        url: https://hn.algolia.com/api/v1/search?hitsPerPage=50\n",
        "      - action: tabular\n",
        "      - action: upsert\n",
        "  alert:\n",
        "    schedule:\n",
        "      cron: 0/1 * * * *\n",
        "      elements:\n",
        "        - select id, text, score from txtai where similar('software development library') and score >= 0.4 and id like 'http%'\n",
        "      iterations: 1\n",
        "    tasks:\n",
        "      - action: search\n",
        "      - action: __main__.Slack\n",
        "\"\"\"\n",
        "\n",
        "app = Application(workflow)\n",
        "app.wait()"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "DWEdyXUgIKqW",
        "outputId": "c00e2949-db6f-4b4e-e514-7f0c1fcb14b1"
      },
      "execution_count": 5,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "2022-02-10 15:12:42,838 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5\n",
            "2022-02-10 15:12:42,839 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *\n",
            "2022-02-10 15:12:42,843 [INFO] schedule: 'index' next run scheduled for 2022-02-10T15:12:45+00:00\n",
            "2022-02-10 15:12:42,851 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T15:13:00+00:00\n",
            "2022-02-10 15:12:45,884 [INFO] schedule: 'index' max iterations (1) reached\n",
            "2022-02-10 15:13:00,042 [INFO] __call__: Sending alert: {'id': 'https://datastation.multiprocess.io/blog/2022-02-08-the-world-of-postgresql-wire-compatibility.html', 'text': 'The world of PostgreSQL wire compatibility', 'score': 0.40123000741004944}\n",
            "2022-02-10 15:13:00,254 [INFO] schedule: 'alert' max iterations (1) reached\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "The log above shows the indexing and alerting jobs each ran once. There was a single match and it was sent to Slack. The score threshold of 0.4 is relatively low, it can be raised if more strict matches are desired.\n",
        "\n",
        "![3.png]()"
      ],
      "metadata": {
        "id": "GsDzX_K8Qh0d"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Build a notification workflow using SQL\n",
        "\n",
        "The next example is similar but it instead runs a SQL search using another column.\n",
        "\n",
        "This workflow indexes the top trending sports events as identified by [neuspo](https://neuspo.com). An alert is generated when an excitement factor of 40 or above is met (the field for excitement is called `weight`). "
      ],
      "metadata": {
        "id": "HqU5KNZwWMnR"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from txtai.app import Application\n",
        "\n",
        "workflow = \"\"\"\n",
        "writable: true\n",
        "\n",
        "embeddings:\n",
        "  path: sentence-transformers/nli-mpnet-base-v2\n",
        "  content: true\n",
        "\n",
        "tabular:\n",
        "  idcolumn: url\n",
        "  textcolumns:\n",
        "    - summary\n",
        "  content: true\n",
        "\n",
        "__main__.Slack:\n",
        "\n",
        "workflow:\n",
        "  index:\n",
        "    schedule:\n",
        "      cron: \"* * * * * 0/5\"\n",
        "      elements:\n",
        "        - 10\n",
        "      iterations: 1\n",
        "    tasks:\n",
        "      - batch: false\n",
        "        extract:\n",
        "          - rows\n",
        "        method: get\n",
        "        params:\n",
        "          size: null\n",
        "        task: service\n",
        "        url: https://neuspo.com/data/articles/list?category=Top&from=0\n",
        "      - action: tabular\n",
        "      - action: upsert\n",
        "  alert:\n",
        "    schedule:\n",
        "      cron: 0/1 * * * *\n",
        "      elements:\n",
        "        - select 'https://neuspo.com' || id id, text from txtai where weight >= 40\n",
        "      iterations: 1\n",
        "    tasks:\n",
        "      - action: search\n",
        "      - action: __main__.Slack\n",
        "\"\"\"\n",
        "\n",
        "app = Application(workflow)\n",
        "app.wait()"
      ],
      "metadata": {
        "id": "1oZag3tKWkfe",
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "outputId": "81f5fbea-6585-4afb-c2d9-1e5de3e17f40"
      },
      "execution_count": 6,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "2022-02-10 15:16:49,702 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5\n",
            "2022-02-10 15:16:49,704 [INFO] schedule: 'index' next run scheduled for 2022-02-10T15:16:50+00:00\n",
            "2022-02-10 15:16:49,704 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *\n",
            "2022-02-10 15:16:49,714 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T15:17:00+00:00\n",
            "2022-02-10 15:16:50,474 [INFO] schedule: 'index' max iterations (1) reached\n",
            "2022-02-10 15:17:00,010 [INFO] __call__: Sending alert: {'id': 'https://neuspo.com/stream/be15c852925b53639b63feb7169a2842', 'text': 'Islanders 6, Canucks 3: Five-goal first period keys Islanders win in first game post-'}\n",
            "2022-02-10 15:17:00,215 [INFO] schedule: 'alert' max iterations (1) reached\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "And the notification in Slack for this job.\n",
        "\n",
        "![]()"
      ],
      "metadata": {
        "id": "e1rB3l8pTRtN"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Search-Summarize-Notify\n",
        "\n",
        "The next section builds on the Hacker News example. Instead of just sending a notification on a match, this workflow will summarize the match first.\n",
        "\n",
        "There are a number of alternative combinations. For example, the summaries could be built at index time. But this example will do everything on the fly when searching."
      ],
      "metadata": {
        "id": "Nsr7vVKFkA11"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from txtai.app import Application\n",
        "\n",
        "workflow = \"\"\"\n",
        "writable: true\n",
        "\n",
        "embeddings:\n",
        "  path: sentence-transformers/nli-mpnet-base-v2\n",
        "  content: true\n",
        "\n",
        "summary:\n",
        "  path: sshleifer/distilbart-cnn-12-6\n",
        "\n",
        "tabular:\n",
        "  idcolumn: url\n",
        "  textcolumns:\n",
        "    - title\n",
        "\n",
        "textractor:\n",
        "  join: true\n",
        "  minlength: 100\n",
        "  paragraphs: true\n",
        "\n",
        "__main__.Slack:\n",
        "\n",
        "workflow:\n",
        "  index:\n",
        "    schedule:\n",
        "      cron: \"* * * * * 0/5\"\n",
        "      elements:\n",
        "        - front_page\n",
        "      iterations: 1\n",
        "    tasks:\n",
        "      - batch: false\n",
        "        extract:\n",
        "          - hits\n",
        "        method: get\n",
        "        params:\n",
        "          tags: null\n",
        "        task: service\n",
        "        url: https://hn.algolia.com/api/v1/search?hitsPerPage=50\n",
        "      - action: tabular\n",
        "      - action: upsert\n",
        "  alert:\n",
        "    schedule:\n",
        "      cron: 0/1 * * * *\n",
        "      elements:\n",
        "        - select id url, id title from txtai where similar('software development library') and score >= 0.4 and id like 'http%'\n",
        "      iterations: 1\n",
        "    tasks:\n",
        "      - action: search\n",
        "      - action: tabular\n",
        "      - action: textractor\n",
        "      - action: summary\n",
        "      - action: __main__.Slack\n",
        "        unpack: false\n",
        "\"\"\"\n",
        "\n",
        "app = Application(workflow)\n",
        "app.wait()"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "SxljaDiPkvv3",
        "outputId": "770ef0cf-f9e3-4e6e-ae18-084422731efb"
      },
      "execution_count": 7,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "2022-02-10 17:19:48,847 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5\n",
            "2022-02-10 17:19:48,857 [INFO] schedule: 'index' next run scheduled for 2022-02-10T17:19:50+00:00\n",
            "2022-02-10 17:19:48,848 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *\n",
            "2022-02-10 17:19:48,864 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T17:20:00+00:00\n",
            "2022-02-10 17:19:50,368 [INFO] schedule: 'index' max iterations (1) reached\n",
            "2022-02-10 17:20:02,233 [INFO] __call__: Sending alert: ('https://datastation.multiprocess.io/blog/2022-02-08-the-world-of-postgresql-wire-compatibility.html', 'Every server-client database has a wire protocol. A wire protocol is the format for interactions between a database server and its clients. It does NOT encompass the actual query language itself, let alone database semantics. Proprietary databases like Oracle and IBM Db2 find value in developing their own drivers.', None)\n",
            "2022-02-10 17:20:02,496 [INFO] schedule: 'alert' max iterations (1) reached\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "And the result in Slack. See how the text is now the article summary vs. the title.\n",
        "\n",
        "![]()"
      ],
      "metadata": {
        "id": "KqtbguMdpsSO"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Search-Summarize-Translate-Notify\n",
        "\n",
        "One more example to really drive this home. Let's do the same as the last example and add a translate to French step."
      ],
      "metadata": {
        "id": "JNL1UX9aqC2z"
      }
    },
    {
      "cell_type": "code",
      "source": [
        "from txtai.app import Application\n",
        "\n",
        "workflow = \"\"\"\n",
        "writable: true\n",
        "\n",
        "embeddings:\n",
        "  path: sentence-transformers/nli-mpnet-base-v2\n",
        "  content: true\n",
        "\n",
        "summary:\n",
        "  path: sshleifer/distilbart-cnn-12-6\n",
        "\n",
        "tabular:\n",
        "  idcolumn: url\n",
        "  textcolumns:\n",
        "    - title\n",
        "\n",
        "textractor:\n",
        "  join: true\n",
        "  minlength: 100\n",
        "  paragraphs: true\n",
        "\n",
        "translation:\n",
        "\n",
        "__main__.Slack:\n",
        "\n",
        "workflow:\n",
        "  index:\n",
        "    schedule:\n",
        "      cron: \"* * * * * 0/5\"\n",
        "      elements:\n",
        "        - front_page\n",
        "      iterations: 1\n",
        "    tasks:\n",
        "      - batch: false\n",
        "        extract:\n",
        "          - hits\n",
        "        method: get\n",
        "        params:\n",
        "          tags: null\n",
        "        task: service\n",
        "        url: https://hn.algolia.com/api/v1/search?hitsPerPage=50\n",
        "      - action: tabular\n",
        "      - action: upsert\n",
        "  alert:\n",
        "    schedule:\n",
        "      cron: 0/1 * * * *\n",
        "      elements:\n",
        "        - select id url, id title from txtai where similar('software development library') and score >= 0.4 and id like 'http%'\n",
        "      iterations: 1\n",
        "    tasks:\n",
        "      - action: search\n",
        "      - action: tabular\n",
        "      - action: textractor\n",
        "      - action: summary\n",
        "      - action: translation\n",
        "        args:\n",
        "          - fr\n",
        "      - action: __main__.Slack\n",
        "        unpack: false\n",
        "\"\"\"\n",
        "\n",
        "app = Application(workflow)\n",
        "app.wait()"
      ],
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "nooA9cFNqNhb",
        "outputId": "febdcfe8-34ce-4952-fb94-ccb5c200ec35"
      },
      "execution_count": 9,
      "outputs": [
        {
          "output_type": "stream",
          "name": "stderr",
          "text": [
            "2022-02-10 17:25:04,448 [INFO] schedule: 'index' scheduler started with schedule * * * * * 0/5\n",
            "2022-02-10 17:25:04,449 [INFO] schedule: 'alert' scheduler started with schedule 0/1 * * * *\n",
            "2022-02-10 17:25:04,451 [INFO] schedule: 'index' next run scheduled for 2022-02-10T17:25:05+00:00\n",
            "2022-02-10 17:25:04,457 [INFO] schedule: 'alert' next run scheduled for 2022-02-10T17:26:00+00:00\n",
            "2022-02-10 17:25:05,357 [INFO] schedule: 'index' max iterations (1) reached\n",
            "2022-02-10 17:26:08,125 [INFO] __call__: Sending alert: ('https://datastation.multiprocess.io/blog/2022-02-08-the-world-of-postgresql-wire-compatibility.html', \"Chaque base de données serveur-client a un protocole filaire. Un protocole filaire est le format pour les interactions entre un serveur de base de données et ses clients. Il n'inclut PAS le langage de requête réel lui-même, et encore moins la sémantique de la base de données.\", None)\n",
            "2022-02-10 17:26:08,310 [INFO] schedule: 'alert' max iterations (1) reached\n"
          ]
        }
      ]
    },
    {
      "cell_type": "markdown",
      "source": [
        "And just like before, Slack has a summary and a link but this time in French!\n",
        "\n",
        "![]()"
      ],
      "metadata": {
        "id": "pSi8e8rGuAah"
      }
    },
    {
      "cell_type": "markdown",
      "source": [
        "# Wrapping up\n",
        "\n",
        "This notebook covered how to build workflow notifications with txtai. There are many directions one could go with this. Build an activity feed, alert when semantic events occur and more. More ideas can be found in the [txtai application](https://huggingface.co/spaces/NeuML/txtai) on Hugging Face Spaces. \n",
        "\n",
        "Everything in this notebook can also be written in Python. The benefits of YAML workflows are that they require little to no-code. Work is ongoing as of txtai 4.1 to make workflows easier to containerize and ultimately run in serverless environments. Keep an eye on this!"
      ],
      "metadata": {
        "id": "Fr99QHPtTMJt"
      }
    }
  ]
}
